package com.spark.stream


import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, Minutes, Seconds, StreamingContext}






/**
  * Created by Administrator on 2017/5/19.
  */
object streamingCount {

  def main(args: Array[String]): Unit = {


    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("kafka-spark-demo")
    val scc = new StreamingContext(sparkConf, Duration(5000))
    val topics = Set("test")
    val kafkaParam = Map(
      "metadata.broker.list" -> "192.168.121.44:9200" // kafka的broker list地址
    )

    val stream =  KafkaUtils.createDirectStream(scc, kafkaParam, topics)

    stream.print()

    scc.start()
    scc.awaitTermination()

  }




}
